---
title: "Python image processing example"
sidebarTitle: "Process images"
description: "Learn how to use Trigger.dev with Python to process images from URLs and upload them to S3."
---

import PythonLearnMore from "/snippets/python-learn-more.mdx";

## Overview

This demo showcases how to use Trigger.dev with Python to process an image using Pillow (PIL) from a URL and upload it to S3-compatible storage bucket.

## Prerequisites

- A project with [Trigger.dev initialized](/quick-start)
- [Python](https://www.python.org/) installed on your local machine

## Features

- A [Trigger.dev](https://trigger.dev) task to trigger the image processing Python script, and then upload the processed image to S3-compatible storage
- The [Trigger.dev Python build extension](https://trigger.dev/docs/config/extensions/pythonExtension) to install dependencies and run Python scripts
- [Pillow (PIL)](https://pillow.readthedocs.io/) for powerful image processing capabilities
- [AWS SDK v3](https://docs.aws.amazon.com/AWSJavaScriptSDK/v3/latest/client/s3/) for S3 uploads
- S3-compatible storage support (AWS S3, Cloudflare R2, etc.)

## GitHub repo

<Card
  title="View the project on GitHub"
  icon="GitHub"
  href="https://github.com/triggerdotdev/examples/tree/main/python-image-processing"
>
  Click here to view the full code for this project in our examples repository on GitHub. You can
  fork it and use it as a starting point for your own project.
</Card>

## The code

### Build configuration

After you've initialized your project with Trigger.dev, add these build settings to your `trigger.config.ts` file:

```ts trigger.config.ts
import { pythonExtension } from "@trigger.dev/python/extension";
import { defineConfig } from "@trigger.dev/sdk";

export default defineConfig({
  runtime: "node",
  project: "<your-project-ref>",
  // Your other config settings...
  build: {
    extensions: [
      pythonExtension({
        // The path to your requirements.txt file
        requirementsFile: "./requirements.txt",
        // The path to your Python binary
        devPythonBinaryPath: `venv/bin/python`,
        // The paths to your Python scripts to run
        scripts: ["src/python/**/*.py"],
      }),
    ],
  },
});
```

<Info>
  Learn more about executing scripts in your Trigger.dev project using our Python build extension
  [here](/config/extensions/pythonExtension).
</Info>

### Task code

This task uses the `python.runScript` method to run the `image-processing.py` script with the given image URL as an argument. You can adjust the image processing parameters in the payload, with options such as height, width, quality, output format, etc.

```ts src/trigger/processImage.ts
import { schemaTask } from "@trigger.dev/sdk";
import { z } from "zod";
import { python } from "@trigger.dev/python";
import { promises as fs } from "fs";
import { S3Client } from "@aws-sdk/client-s3";
import { Upload } from "@aws-sdk/lib-storage";

// Initialize S3 client
const s3Client = new S3Client({
  region: "auto",
  endpoint: process.env.S3_ENDPOINT,
  credentials: {
    accessKeyId: process.env.S3_ACCESS_KEY_ID ?? "",
    secretAccessKey: process.env.S3_SECRET_ACCESS_KEY ?? "",
  },
});

// Define the input schema with Zod
const imageProcessingSchema = z.object({
  imageUrl: z.string().url(),
  height: z.number().positive().optional().default(800),
  width: z.number().positive().optional().default(600),
  quality: z.number().min(1).max(100).optional().default(85),
  maintainAspectRatio: z.boolean().optional().default(true),
  outputFormat: z.enum(["jpeg", "png", "webp", "gif", "avif"]).optional().default("jpeg"),
  brightness: z.number().optional(),
  contrast: z.number().optional(),
  sharpness: z.number().optional(),
  grayscale: z.boolean().optional().default(false),
});

// Define the output schema
const outputSchema = z.object({
  url: z.string().url(),
  key: z.string(),
  format: z.string(),
  originalSize: z.object({
    width: z.number(),
    height: z.number(),
  }),
  newSize: z.object({
    width: z.number(),
    height: z.number(),
  }),
  fileSizeBytes: z.number(),
  exitCode: z.number(),
});

export const processImage = schemaTask({
  id: "process-image",
  schema: imageProcessingSchema,
  run: async (payload, io) => {
    const {
      imageUrl,
      height,
      width,
      quality,
      maintainAspectRatio,
      outputFormat,
      brightness,
      contrast,
      sharpness,
      grayscale,
    } = payload;

    try {
      // Run the Python script
      const result = await python.runScript("./src/python/image-processing.py", [
        imageUrl,
        height.toString(),
        width.toString(),
        quality.toString(),
        maintainAspectRatio.toString(),
        outputFormat,
        brightness?.toString() || "null",
        contrast?.toString() || "null",
        sharpness?.toString() || "null",
        grayscale.toString(),
      ]);

      const { outputPath, format, originalSize, newSize, fileSizeBytes } = JSON.parse(
        result.stdout
      );

      // Read file once
      const fileContent = await fs.readFile(outputPath);

      try {
        // Upload to S3
        const key = `processed-images/${Date.now()}-${outputPath.split("/").pop()}`;
        await new Upload({
          client: s3Client,
          params: {
            Bucket: process.env.S3_BUCKET!,
            Key: key,
            Body: fileContent,
            ContentType: `image/${format}`,
          },
        }).done();

        return {
          url: `${process.env.S3_PUBLIC_URL}/${key}`,
          key,
          format,
          originalSize,
          newSize,
          fileSizeBytes,
          exitCode: result.exitCode,
        };
      } finally {
        // Always clean up the temp file
        await fs.unlink(outputPath).catch(console.error);
      }
    } catch (error) {
      throw new Error(
        `Processing failed: ${error instanceof Error ? error.message : "Unknown error"}`
      );
    }
  },
});
```

### Add a requirements.txt file

Add the following to your `requirements.txt` file. This is required in Python projects to install the dependencies.

```txt requirements.txt
# Core dependencies
Pillow==10.2.0            # Image processing library
python-dotenv==1.0.0      # Environment variable management
requests==2.31.0          # HTTP requests
numpy==1.26.3             # Numerical operations (for advanced processing)

# Optional enhancements
opencv-python==4.8.1.78   # For more advanced image processing
```

### The Python script

The Python script uses Pillow (PIL) to process an image. You can see the original script in our examples repository [here](https://github.com/triggerdotdev/examples/blob/main/python-image-processing/src/python/image-processing.py).

```python src/python/image-processing.py
from PIL import Image, ImageOps, ImageEnhance
import io
from io import BytesIO
import os
from typing import Tuple, List, Dict, Optional, Union
import logging
import sys
import json
import requests

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

class ImageProcessor:
    """Image processing utility for resizing, optimizing, and converting images."""

    # Supported formats for conversion
    SUPPORTED_FORMATS = ['JPEG', 'PNG', 'WEBP', 'GIF', 'AVIF']

    @staticmethod
    def open_image(image_data: Union[bytes, str]) -> Image.Image:
        """Open an image from bytes or file path."""
        try:
            if isinstance(image_data, bytes):
                return Image.open(io.BytesIO(image_data))
            else:
                return Image.open(image_data)
        except Exception as e:
            logger.error(f"Failed to open image: {e}")
            raise ValueError(f"Could not open image: {e}")

    @staticmethod
    def resize_image(
        img: Image.Image,
        width: Optional[int] = None,
        height: Optional[int] = None,
        maintain_aspect_ratio: bool = True
    ) -> Image.Image:
        """
        Resize an image to specified dimensions.

        Args:
            img: PIL Image object
            width: Target width (None to auto-calculate from height)
            height: Target height (None to auto-calculate from width)
            maintain_aspect_ratio: Whether to maintain the original aspect ratio

        Returns:
            Resized PIL Image
        """
        if width is None and height is None:
            return img  # No resize needed

        original_width, original_height = img.size

        if maintain_aspect_ratio:
            if width and height:
                # Calculate the best fit while maintaining aspect ratio
                ratio = min(width / original_width, height / original_height)
                new_width = int(original_width * ratio)
                new_height = int(original_height * ratio)
            elif width:
                # Calculate height based on width
                ratio = width / original_width
                new_width = width
                new_height = int(original_height * ratio)
            else:
                # Calculate width based on height
                ratio = height / original_height
                new_width = int(original_width * ratio)
                new_height = height
        else:
            # Force exact dimensions
            new_width = width if width else original_width
            new_height = height if height else original_height

        return img.resize((new_width, new_height), Image.LANCZOS)

    @staticmethod
    def optimize_image(
        img: Image.Image,
        quality: int = 85,
        format: Optional[str] = None
    ) -> Tuple[bytes, str]:
        """
        Optimize an image for web delivery.

        Args:
            img: PIL Image object
            quality: JPEG/WebP quality (0-100)
            format: Output format (JPEG, PNG, WEBP, etc.)

        Returns:
            Tuple of (image_bytes, format)
        """
        if format is None:
            format = img.format or 'JPEG'

        format = format.upper()
        if format not in ImageProcessor.SUPPORTED_FORMATS:
            format = 'JPEG'  # Default to JPEG if unsupported format

        # Convert mode if needed
        if format == 'JPEG' and img.mode in ('RGBA', 'P'):
            img = img.convert('RGB')

        # Save to bytes
        buffer = io.BytesIO()

        if format == 'JPEG':
            img.save(buffer, format=format, quality=quality, optimize=True)
        elif format == 'PNG':
            img.save(buffer, format=format, optimize=True)
        elif format == 'WEBP':
            img.save(buffer, format=format, quality=quality)
        elif format == 'AVIF':
            img.save(buffer, format=format, quality=quality)
        else:
            img.save(buffer, format=format)

        buffer.seek(0)
        return buffer.getvalue(), format.lower()

    @staticmethod
    def apply_filters(
        img: Image.Image,
        brightness: Optional[float] = None,
        contrast: Optional[float] = None,
        sharpness: Optional[float] = None,
        grayscale: bool = False
    ) -> Image.Image:
        """
        Apply various filters and enhancements to an image.

        Args:
            img: PIL Image object
            brightness: Brightness factor (0.0-2.0, 1.0 is original)
            contrast: Contrast factor (0.0-2.0, 1.0 is original)
            sharpness: Sharpness factor (0.0-2.0, 1.0 is original)
            grayscale: Convert to grayscale if True

        Returns:
            Processed PIL Image
        """
        # Apply grayscale first if requested
        if grayscale:
            img = ImageOps.grayscale(img)
            # Convert back to RGB if other filters will be applied
            if any(x is not None for x in [brightness, contrast, sharpness]):
                img = img.convert('RGB')

        # Apply enhancements
        if brightness is not None:
            img = ImageEnhance.Brightness(img).enhance(brightness)

        if contrast is not None:
            img = ImageEnhance.Contrast(img).enhance(contrast)

        if sharpness is not None:
            img = ImageEnhance.Sharpness(img).enhance(sharpness)

        return img

    @staticmethod
    def process_image(
        image_data: Union[bytes, str],
        width: Optional[int] = None,
        height: Optional[int] = None,
        maintain_aspect_ratio: bool = True,
        quality: int = 85,
        output_format: Optional[str] = None,
        brightness: Optional[float] = None,
        contrast: Optional[float] = None,
        sharpness: Optional[float] = None,
        grayscale: bool = False
    ) -> Dict:
        """
        Process an image with all available options.

        Args:
            image_data: Image bytes or file path
            width: Target width
            height: Target height
            maintain_aspect_ratio: Whether to maintain aspect ratio
            quality: Output quality
            output_format: Output format
            brightness: Brightness adjustment
            contrast: Contrast adjustment
            sharpness: Sharpness adjustment
            grayscale: Convert to grayscale

        Returns:
            Dict with processed image data and metadata
        """
        # Open the image
        img = ImageProcessor.open_image(image_data)
        original_format = img.format
        original_size = img.size

        # Apply filters
        img = ImageProcessor.apply_filters(
            img,
            brightness=brightness,
            contrast=contrast,
            sharpness=sharpness,
            grayscale=grayscale
        )

        # Resize if needed
        if width or height:
            img = ImageProcessor.resize_image(
                img,
                width=width,
                height=height,
                maintain_aspect_ratio=maintain_aspect_ratio
            )

        # Optimize and get bytes
        processed_bytes, actual_format = ImageProcessor.optimize_image(
            img,
            quality=quality,
            format=output_format
        )

        # Return result with metadata
        return {
            "processed_image": processed_bytes,
            "format": actual_format,
            "original_format": original_format,
            "original_size": original_size,
            "new_size": img.size,
            "file_size_bytes": len(processed_bytes)
        }

def process_image(url, height, width, quality):
    # Download image from URL
    response = requests.get(url)
    img = Image.open(BytesIO(response.content))

    # Resize
    img = img.resize((int(width), int(height)), Image.Resampling.LANCZOS)

    # Save with quality setting
    output_path = f"/tmp/processed_{width}x{height}.jpg"
    img.save(output_path, "JPEG", quality=int(quality))

    return output_path

if __name__ == "__main__":
    url = sys.argv[1]
    height = int(sys.argv[2])
    width = int(sys.argv[3])
    quality = int(sys.argv[4])
    maintain_aspect_ratio = sys.argv[5].lower() == 'true'
    output_format = sys.argv[6]
    brightness = float(sys.argv[7]) if sys.argv[7] != 'null' else None
    contrast = float(sys.argv[8]) if sys.argv[8] != 'null' else None
    sharpness = float(sys.argv[9]) if sys.argv[9] != 'null' else None
    grayscale = sys.argv[10].lower() == 'true'

    processor = ImageProcessor()
    result = processor.process_image(
        requests.get(url).content,
        width=width,
        height=height,
        maintain_aspect_ratio=maintain_aspect_ratio,
        quality=quality,
        output_format=output_format,
        brightness=brightness,
        contrast=contrast,
        sharpness=sharpness,
        grayscale=grayscale
    )

    output_path = f"/tmp/processed_{width}x{height}.{result['format']}"
    with open(output_path, 'wb') as f:
        f.write(result['processed_image'])

    print(json.dumps({
        "outputPath": output_path,
        "format": result['format'],
        "originalSize": result['original_size'],
        "newSize": result['new_size'],
        "fileSizeBytes": result['file_size_bytes']
    }))
```

## Testing your task

1. Create a virtual environment `python -m venv venv`
2. Activate the virtual environment, depending on your OS: On Mac/Linux: `source venv/bin/activate`, on Windows: `venv\Scripts\activate`
3. Install the Python dependencies `pip install -r requirements.txt`
4. Set up your S3-compatible storage credentials in your environment variables, in .env for local development, or in the Trigger.dev dashboard for production:
   ```
   S3_ENDPOINT=https://your-endpoint.com
   S3_ACCESS_KEY_ID=your-access-key
   S3_SECRET_ACCESS_KEY=your-secret-key
   S3_BUCKET=your-bucket-name
   S3_PUBLIC_URL=https://your-public-url.com
   ```
5. Copy the project ref from your [Trigger.dev dashboard](https://cloud.trigger.dev) and add it to the `trigger.config.ts` file.
6. Run the Trigger.dev CLI `dev` command (it may ask you to authorize the CLI if you haven't already).
7. Test the task in the dashboard by providing a valid image URL and processing options.
8. Deploy the task to production using the Trigger.dev CLI `deploy` command.

## Example Payload

These are all optional parameters that can be passed to the `image-processing.py` Python script from the `processImage.ts` task.

```json
{
  "imageUrl": "<your-image-url>",
  "height": 1200,
  "width": 900,
  "quality": 90,
  "maintainAspectRatio": true,
  "outputFormat": "webp",
  "brightness": 1.2,
  "contrast": 1.1,
  "sharpness": 1.3,
  "grayscale": false
}
```

## Deploying your task

Deploy the task to production using the CLI command `npx trigger.dev@latest deploy`

<PythonLearnMore />
